tokio\net\windows/
named_pipe.rs

1//! Tokio support for [Windows named pipes].
2//!
3//! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5use std::ffi::c_void;
6use std::ffi::OsStr;
7use std::io::{self, Read, Write};
8use std::pin::Pin;
9use std::ptr;
10use std::ptr::null_mut;
11use std::task::{Context, Poll};
12
13use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
14use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
15
16cfg_io_util! {
17    use bytes::BufMut;
18}
19
20// Hide imports which are not used when generating documentation.
21#[cfg(windows)]
22mod doc {
23    pub(super) use crate::os::windows::ffi::OsStrExt;
24    pub(super) mod windows_sys {
25        pub(crate) use windows_sys::{
26            Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
27            Win32::System::SystemServices::*,
28        };
29    }
30    pub(super) use mio::windows as mio_windows;
31}
32
33// NB: none of these shows up in public API, so don't document them.
34#[cfg(not(windows))]
35mod doc {
36    pub(super) mod mio_windows {
37        pub type NamedPipe = crate::doc::NotDefinedHere;
38    }
39}
40
41use self::doc::*;
42
43/// A [Windows named pipe] server.
44///
45/// Accepting client connections involves creating a server with
46/// [`ServerOptions::create`] and waiting for clients to connect using
47/// [`NamedPipeServer::connect`].
48///
49/// To avoid having clients sporadically fail with
50/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
51/// ensure that at least one server instance is available at all times. This
52/// means that the typical listen loop for a server is a bit involved, because
53/// we have to ensure that we never drop a server accidentally while a client
54/// might connect.
55///
56/// So a correctly implemented server looks like this:
57///
58/// ```no_run
59/// use std::io;
60/// use tokio::net::windows::named_pipe::ServerOptions;
61///
62/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
63///
64/// # #[tokio::main] async fn main() -> std::io::Result<()> {
65/// // The first server needs to be constructed early so that clients can
66/// // be correctly connected. Otherwise calling .wait will cause the client to
67/// // error.
68/// //
69/// // Here we also make use of `first_pipe_instance`, which will ensure that
70/// // there are no other servers up and running already.
71/// let mut server = ServerOptions::new()
72///     .first_pipe_instance(true)
73///     .create(PIPE_NAME)?;
74///
75/// // Spawn the server loop.
76/// let server = tokio::spawn(async move {
77///     loop {
78///         // Wait for a client to connect.
79///         server.connect().await?;
80///         let connected_client = server;
81///
82///         // Construct the next server to be connected before sending the one
83///         // we already have of onto a task. This ensures that the server
84///         // isn't closed (after it's done in the task) before a new one is
85///         // available. Otherwise the client might error with
86///         // `io::ErrorKind::NotFound`.
87///         server = ServerOptions::new().create(PIPE_NAME)?;
88///
89///         let client = tokio::spawn(async move {
90///             /* use the connected client */
91/// #           Ok::<_, std::io::Error>(())
92///         });
93/// #       if true { break } // needed for type inference to work
94///     }
95///
96///     Ok::<_, io::Error>(())
97/// });
98///
99/// /* do something else not server related here */
100/// # Ok(()) }
101/// ```
102///
103/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
104#[derive(Debug)]
105pub struct NamedPipeServer {
106    io: PollEvented<mio_windows::NamedPipe>,
107}
108
109impl NamedPipeServer {
110    /// Constructs a new named pipe server from the specified raw handle.
111    ///
112    /// This function will consume ownership of the handle given, passing
113    /// responsibility for closing the handle to the returned object.
114    ///
115    /// This function is also unsafe as the primitives currently returned have
116    /// the contract that they are the sole owner of the file descriptor they
117    /// are wrapping. Usage of this function could accidentally allow violating
118    /// this contract which can cause memory unsafety in code that relies on it
119    /// being true.
120    ///
121    /// # Errors
122    ///
123    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
124    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
125    ///
126    /// [Tokio Runtime]: crate::runtime::Runtime
127    /// [enabled I/O]: crate::runtime::Builder::enable_io
128    pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
129        let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
130
131        Ok(Self {
132            io: PollEvented::new(named_pipe)?,
133        })
134    }
135
136    /// Retrieves information about the named pipe the server is associated
137    /// with.
138    ///
139    /// ```no_run
140    /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
141    ///
142    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
143    ///
144    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
145    /// let server = ServerOptions::new()
146    ///     .pipe_mode(PipeMode::Message)
147    ///     .max_instances(5)
148    ///     .create(PIPE_NAME)?;
149    ///
150    /// let server_info = server.info()?;
151    ///
152    /// assert_eq!(server_info.end, PipeEnd::Server);
153    /// assert_eq!(server_info.mode, PipeMode::Message);
154    /// assert_eq!(server_info.max_instances, 5);
155    /// # Ok(()) }
156    /// ```
157    pub fn info(&self) -> io::Result<PipeInfo> {
158        // Safety: we're ensuring the lifetime of the named pipe.
159        unsafe { named_pipe_info(self.io.as_raw_handle()) }
160    }
161
162    /// Enables a named pipe server process to wait for a client process to
163    /// connect to an instance of a named pipe. A client process connects by
164    /// creating a named pipe with the same name.
165    ///
166    /// This corresponds to the [`ConnectNamedPipe`] system call.
167    ///
168    /// # Cancel safety
169    ///
170    /// This method is cancellation safe in the sense that if it is used as the
171    /// event in a [`select!`](crate::select) statement and some other branch
172    /// completes first, then no connection events have been lost.
173    ///
174    /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
175    ///
176    /// # Example
177    ///
178    /// ```no_run
179    /// use tokio::net::windows::named_pipe::ServerOptions;
180    ///
181    /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
182    ///
183    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
184    /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
185    ///
186    /// // Wait for a client to connect.
187    /// pipe.connect().await?;
188    ///
189    /// // Use the connected client...
190    /// # Ok(()) }
191    /// ```
192    pub async fn connect(&self) -> io::Result<()> {
193        match self.io.connect() {
194            Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
195                self.io
196                    .registration()
197                    .async_io(Interest::WRITABLE, || self.io.connect())
198                    .await
199            }
200            x => x,
201        }
202    }
203
204    /// Disconnects the server end of a named pipe instance from a client
205    /// process.
206    ///
207    /// ```
208    /// use tokio::io::AsyncWriteExt;
209    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
210    /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
211    ///
212    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
213    ///
214    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
215    /// let server = ServerOptions::new()
216    ///     .create(PIPE_NAME)?;
217    ///
218    /// let mut client = ClientOptions::new()
219    ///     .open(PIPE_NAME)?;
220    ///
221    /// // Wait for a client to become connected.
222    /// server.connect().await?;
223    ///
224    /// // Forcibly disconnect the client.
225    /// server.disconnect()?;
226    ///
227    /// // Write fails with an OS-specific error after client has been
228    /// // disconnected.
229    /// let e = client.write(b"ping").await.unwrap_err();
230    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
231    /// # Ok(()) }
232    /// ```
233    pub fn disconnect(&self) -> io::Result<()> {
234        self.io.disconnect()
235    }
236
237    /// Waits for any of the requested ready states.
238    ///
239    /// This function is usually paired with `try_read()` or `try_write()`. It
240    /// can be used to concurrently read / write to the same pipe on a single
241    /// task without splitting the pipe.
242    ///
243    /// The function may complete without the pipe being ready. This is a
244    /// false-positive and attempting an operation will return with
245    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
246    /// [`Ready`] set, so you should always check the returned value and possibly
247    /// wait again if the requested states are not set.
248    ///
249    /// # Examples
250    ///
251    /// Concurrently read and write to the pipe on the same task without
252    /// splitting.
253    ///
254    /// ```no_run
255    /// use tokio::io::Interest;
256    /// use tokio::net::windows::named_pipe;
257    /// use std::error::Error;
258    /// use std::io;
259    ///
260    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
261    ///
262    /// #[tokio::main]
263    /// async fn main() -> Result<(), Box<dyn Error>> {
264    ///     let server = named_pipe::ServerOptions::new()
265    ///         .create(PIPE_NAME)?;
266    ///
267    ///     loop {
268    ///         let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
269    ///
270    ///         if ready.is_readable() {
271    ///             let mut data = vec![0; 1024];
272    ///             // Try to read data, this may still fail with `WouldBlock`
273    ///             // if the readiness event is a false positive.
274    ///             match server.try_read(&mut data) {
275    ///                 Ok(n) => {
276    ///                     println!("read {} bytes", n);
277    ///                 }
278    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
279    ///                     continue;
280    ///                 }
281    ///                 Err(e) => {
282    ///                     return Err(e.into());
283    ///                 }
284    ///             }
285    ///         }
286    ///
287    ///         if ready.is_writable() {
288    ///             // Try to write data, this may still fail with `WouldBlock`
289    ///             // if the readiness event is a false positive.
290    ///             match server.try_write(b"hello world") {
291    ///                 Ok(n) => {
292    ///                     println!("write {} bytes", n);
293    ///                 }
294    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
295    ///                     continue;
296    ///                 }
297    ///                 Err(e) => {
298    ///                     return Err(e.into());
299    ///                 }
300    ///             }
301    ///         }
302    ///     }
303    /// }
304    /// ```
305    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
306        let event = self.io.registration().readiness(interest).await?;
307        Ok(event.ready)
308    }
309
310    /// Waits for the pipe to become readable.
311    ///
312    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
313    /// paired with `try_read()`.
314    ///
315    /// # Examples
316    ///
317    /// ```no_run
318    /// use tokio::net::windows::named_pipe;
319    /// use std::error::Error;
320    /// use std::io;
321    ///
322    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
323    ///
324    /// #[tokio::main]
325    /// async fn main() -> Result<(), Box<dyn Error>> {
326    ///     let server = named_pipe::ServerOptions::new()
327    ///         .create(PIPE_NAME)?;
328    ///
329    ///     let mut msg = vec![0; 1024];
330    ///
331    ///     loop {
332    ///         // Wait for the pipe to be readable
333    ///         server.readable().await?;
334    ///
335    ///         // Try to read data, this may still fail with `WouldBlock`
336    ///         // if the readiness event is a false positive.
337    ///         match server.try_read(&mut msg) {
338    ///             Ok(n) => {
339    ///                 msg.truncate(n);
340    ///                 break;
341    ///             }
342    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
343    ///                 continue;
344    ///             }
345    ///             Err(e) => {
346    ///                 return Err(e.into());
347    ///             }
348    ///         }
349    ///     }
350    ///
351    ///     println!("GOT = {:?}", msg);
352    ///     Ok(())
353    /// }
354    /// ```
355    pub async fn readable(&self) -> io::Result<()> {
356        self.ready(Interest::READABLE).await?;
357        Ok(())
358    }
359
360    /// Polls for read readiness.
361    ///
362    /// If the pipe is not currently ready for reading, this method will
363    /// store a clone of the `Waker` from the provided `Context`. When the pipe
364    /// becomes ready for reading, `Waker::wake` will be called on the waker.
365    ///
366    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
367    /// the `Waker` from the `Context` passed to the most recent call is
368    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
369    /// second, independent waker.)
370    ///
371    /// This function is intended for cases where creating and pinning a future
372    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
373    /// preferred, as this supports polling from multiple tasks at once.
374    ///
375    /// # Return value
376    ///
377    /// The function returns:
378    ///
379    /// * `Poll::Pending` if the pipe is not ready for reading.
380    /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
381    /// * `Poll::Ready(Err(e))` if an error is encountered.
382    ///
383    /// # Errors
384    ///
385    /// This function may encounter any standard I/O error except `WouldBlock`.
386    ///
387    /// [`readable`]: method@Self::readable
388    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
389        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
390    }
391
392    /// Tries to read data from the pipe into the provided buffer, returning how
393    /// many bytes were read.
394    ///
395    /// Receives any pending data from the pipe but does not wait for new data
396    /// to arrive. On success, returns the number of bytes read. Because
397    /// `try_read()` is non-blocking, the buffer does not have to be stored by
398    /// the async task and can exist entirely on the stack.
399    ///
400    /// Usually, [`readable()`] or [`ready()`] is used with this function.
401    ///
402    /// [`readable()`]: NamedPipeServer::readable()
403    /// [`ready()`]: NamedPipeServer::ready()
404    ///
405    /// # Return
406    ///
407    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
408    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
409    ///
410    /// 1. The pipe's read half is closed and will no longer yield data.
411    /// 2. The specified buffer was 0 bytes in length.
412    ///
413    /// If the pipe is not ready to read data,
414    /// `Err(io::ErrorKind::WouldBlock)` is returned.
415    ///
416    /// # Examples
417    ///
418    /// ```no_run
419    /// use tokio::net::windows::named_pipe;
420    /// use std::error::Error;
421    /// use std::io;
422    ///
423    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
424    ///
425    /// #[tokio::main]
426    /// async fn main() -> Result<(), Box<dyn Error>> {
427    ///     let server = named_pipe::ServerOptions::new()
428    ///         .create(PIPE_NAME)?;
429    ///
430    ///     loop {
431    ///         // Wait for the pipe to be readable
432    ///         server.readable().await?;
433    ///
434    ///         // Creating the buffer **after** the `await` prevents it from
435    ///         // being stored in the async task.
436    ///         let mut buf = [0; 4096];
437    ///
438    ///         // Try to read data, this may still fail with `WouldBlock`
439    ///         // if the readiness event is a false positive.
440    ///         match server.try_read(&mut buf) {
441    ///             Ok(0) => break,
442    ///             Ok(n) => {
443    ///                 println!("read {} bytes", n);
444    ///             }
445    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
446    ///                 continue;
447    ///             }
448    ///             Err(e) => {
449    ///                 return Err(e.into());
450    ///             }
451    ///         }
452    ///     }
453    ///
454    ///     Ok(())
455    /// }
456    /// ```
457    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
458        self.io
459            .registration()
460            .try_io(Interest::READABLE, || (&*self.io).read(buf))
461    }
462
463    /// Tries to read data from the pipe into the provided buffers, returning
464    /// how many bytes were read.
465    ///
466    /// Data is copied to fill each buffer in order, with the final buffer
467    /// written to possibly being only partially filled. This method behaves
468    /// equivalently to a single call to [`try_read()`] with concatenated
469    /// buffers.
470    ///
471    /// Receives any pending data from the pipe but does not wait for new data
472    /// to arrive. On success, returns the number of bytes read. Because
473    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
474    /// stored by the async task and can exist entirely on the stack.
475    ///
476    /// Usually, [`readable()`] or [`ready()`] is used with this function.
477    ///
478    /// [`try_read()`]: NamedPipeServer::try_read()
479    /// [`readable()`]: NamedPipeServer::readable()
480    /// [`ready()`]: NamedPipeServer::ready()
481    ///
482    /// # Return
483    ///
484    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
485    /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
486    /// and will no longer yield data. If the pipe is not ready to read data
487    /// `Err(io::ErrorKind::WouldBlock)` is returned.
488    ///
489    /// # Examples
490    ///
491    /// ```no_run
492    /// use tokio::net::windows::named_pipe;
493    /// use std::error::Error;
494    /// use std::io::{self, IoSliceMut};
495    ///
496    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
497    ///
498    /// #[tokio::main]
499    /// async fn main() -> Result<(), Box<dyn Error>> {
500    ///     let server = named_pipe::ServerOptions::new()
501    ///         .create(PIPE_NAME)?;
502    ///
503    ///     loop {
504    ///         // Wait for the pipe to be readable
505    ///         server.readable().await?;
506    ///
507    ///         // Creating the buffer **after** the `await` prevents it from
508    ///         // being stored in the async task.
509    ///         let mut buf_a = [0; 512];
510    ///         let mut buf_b = [0; 1024];
511    ///         let mut bufs = [
512    ///             IoSliceMut::new(&mut buf_a),
513    ///             IoSliceMut::new(&mut buf_b),
514    ///         ];
515    ///
516    ///         // Try to read data, this may still fail with `WouldBlock`
517    ///         // if the readiness event is a false positive.
518    ///         match server.try_read_vectored(&mut bufs) {
519    ///             Ok(0) => break,
520    ///             Ok(n) => {
521    ///                 println!("read {} bytes", n);
522    ///             }
523    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
524    ///                 continue;
525    ///             }
526    ///             Err(e) => {
527    ///                 return Err(e.into());
528    ///             }
529    ///         }
530    ///     }
531    ///
532    ///     Ok(())
533    /// }
534    /// ```
535    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
536        self.io
537            .registration()
538            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
539    }
540
541    cfg_io_util! {
542        /// Tries to read data from the stream into the provided buffer, advancing the
543        /// buffer's internal cursor, returning how many bytes were read.
544        ///
545        /// Receives any pending data from the pipe but does not wait for new data
546        /// to arrive. On success, returns the number of bytes read. Because
547        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
548        /// the async task and can exist entirely on the stack.
549        ///
550        /// Usually, [`readable()`] or [`ready()`] is used with this function.
551        ///
552        /// [`readable()`]: NamedPipeServer::readable()
553        /// [`ready()`]: NamedPipeServer::ready()
554        ///
555        /// # Return
556        ///
557        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
558        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
559        /// and will no longer yield data. If the stream is not ready to read data
560        /// `Err(io::ErrorKind::WouldBlock)` is returned.
561        ///
562        /// # Examples
563        ///
564        /// ```no_run
565        /// use tokio::net::windows::named_pipe;
566        /// use std::error::Error;
567        /// use std::io;
568        ///
569        /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
570        ///
571        /// #[tokio::main]
572        /// async fn main() -> Result<(), Box<dyn Error>> {
573        ///     let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
574        ///
575        ///     loop {
576        ///         // Wait for the pipe to be readable
577        ///         server.readable().await?;
578        ///
579        ///         let mut buf = Vec::with_capacity(4096);
580        ///
581        ///         // Try to read data, this may still fail with `WouldBlock`
582        ///         // if the readiness event is a false positive.
583        ///         match server.try_read_buf(&mut buf) {
584        ///             Ok(0) => break,
585        ///             Ok(n) => {
586        ///                 println!("read {} bytes", n);
587        ///             }
588        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
589        ///                 continue;
590        ///             }
591        ///             Err(e) => {
592        ///                 return Err(e.into());
593        ///             }
594        ///         }
595        ///     }
596        ///
597        ///     Ok(())
598        /// }
599        /// ```
600        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
601            self.io.registration().try_io(Interest::READABLE, || {
602                use std::io::Read;
603
604                let dst = buf.chunk_mut();
605                let dst =
606                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
607
608                // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
609                // buffer.
610                let n = (&*self.io).read(dst)?;
611
612                unsafe {
613                    buf.advance_mut(n);
614                }
615
616                Ok(n)
617            })
618        }
619    }
620
621    /// Waits for the pipe to become writable.
622    ///
623    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
624    /// paired with `try_write()`.
625    ///
626    /// # Examples
627    ///
628    /// ```no_run
629    /// use tokio::net::windows::named_pipe;
630    /// use std::error::Error;
631    /// use std::io;
632    ///
633    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
634    ///
635    /// #[tokio::main]
636    /// async fn main() -> Result<(), Box<dyn Error>> {
637    ///     let server = named_pipe::ServerOptions::new()
638    ///         .create(PIPE_NAME)?;
639    ///
640    ///     loop {
641    ///         // Wait for the pipe to be writable
642    ///         server.writable().await?;
643    ///
644    ///         // Try to write data, this may still fail with `WouldBlock`
645    ///         // if the readiness event is a false positive.
646    ///         match server.try_write(b"hello world") {
647    ///             Ok(n) => {
648    ///                 break;
649    ///             }
650    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
651    ///                 continue;
652    ///             }
653    ///             Err(e) => {
654    ///                 return Err(e.into());
655    ///             }
656    ///         }
657    ///     }
658    ///
659    ///     Ok(())
660    /// }
661    /// ```
662    pub async fn writable(&self) -> io::Result<()> {
663        self.ready(Interest::WRITABLE).await?;
664        Ok(())
665    }
666
667    /// Polls for write readiness.
668    ///
669    /// If the pipe is not currently ready for writing, this method will
670    /// store a clone of the `Waker` from the provided `Context`. When the pipe
671    /// becomes ready for writing, `Waker::wake` will be called on the waker.
672    ///
673    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
674    /// the `Waker` from the `Context` passed to the most recent call is
675    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
676    /// second, independent waker.)
677    ///
678    /// This function is intended for cases where creating and pinning a future
679    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
680    /// preferred, as this supports polling from multiple tasks at once.
681    ///
682    /// # Return value
683    ///
684    /// The function returns:
685    ///
686    /// * `Poll::Pending` if the pipe is not ready for writing.
687    /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
688    /// * `Poll::Ready(Err(e))` if an error is encountered.
689    ///
690    /// # Errors
691    ///
692    /// This function may encounter any standard I/O error except `WouldBlock`.
693    ///
694    /// [`writable`]: method@Self::writable
695    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
696        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
697    }
698
699    /// Tries to write a buffer to the pipe, returning how many bytes were
700    /// written.
701    ///
702    /// The function will attempt to write the entire contents of `buf`, but
703    /// only part of the buffer may be written.
704    ///
705    /// This function is usually paired with `writable()`.
706    ///
707    /// # Return
708    ///
709    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
710    /// number of bytes written. If the pipe is not ready to write data,
711    /// `Err(io::ErrorKind::WouldBlock)` is returned.
712    ///
713    /// # Examples
714    ///
715    /// ```no_run
716    /// use tokio::net::windows::named_pipe;
717    /// use std::error::Error;
718    /// use std::io;
719    ///
720    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
721    ///
722    /// #[tokio::main]
723    /// async fn main() -> Result<(), Box<dyn Error>> {
724    ///     let server = named_pipe::ServerOptions::new()
725    ///         .create(PIPE_NAME)?;
726    ///
727    ///     loop {
728    ///         // Wait for the pipe to be writable
729    ///         server.writable().await?;
730    ///
731    ///         // Try to write data, this may still fail with `WouldBlock`
732    ///         // if the readiness event is a false positive.
733    ///         match server.try_write(b"hello world") {
734    ///             Ok(n) => {
735    ///                 break;
736    ///             }
737    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
738    ///                 continue;
739    ///             }
740    ///             Err(e) => {
741    ///                 return Err(e.into());
742    ///             }
743    ///         }
744    ///     }
745    ///
746    ///     Ok(())
747    /// }
748    /// ```
749    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
750        self.io
751            .registration()
752            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
753    }
754
755    /// Tries to write several buffers to the pipe, returning how many bytes
756    /// were written.
757    ///
758    /// Data is written from each buffer in order, with the final buffer read
759    /// from possible being only partially consumed. This method behaves
760    /// equivalently to a single call to [`try_write()`] with concatenated
761    /// buffers.
762    ///
763    /// This function is usually paired with `writable()`.
764    ///
765    /// [`try_write()`]: NamedPipeServer::try_write()
766    ///
767    /// # Return
768    ///
769    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
770    /// number of bytes written. If the pipe is not ready to write data,
771    /// `Err(io::ErrorKind::WouldBlock)` is returned.
772    ///
773    /// # Examples
774    ///
775    /// ```no_run
776    /// use tokio::net::windows::named_pipe;
777    /// use std::error::Error;
778    /// use std::io;
779    ///
780    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
781    ///
782    /// #[tokio::main]
783    /// async fn main() -> Result<(), Box<dyn Error>> {
784    ///     let server = named_pipe::ServerOptions::new()
785    ///         .create(PIPE_NAME)?;
786    ///
787    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
788    ///
789    ///     loop {
790    ///         // Wait for the pipe to be writable
791    ///         server.writable().await?;
792    ///
793    ///         // Try to write data, this may still fail with `WouldBlock`
794    ///         // if the readiness event is a false positive.
795    ///         match server.try_write_vectored(&bufs) {
796    ///             Ok(n) => {
797    ///                 break;
798    ///             }
799    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
800    ///                 continue;
801    ///             }
802    ///             Err(e) => {
803    ///                 return Err(e.into());
804    ///             }
805    ///         }
806    ///     }
807    ///
808    ///     Ok(())
809    /// }
810    /// ```
811    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
812        self.io
813            .registration()
814            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
815    }
816
817    /// Tries to read or write from the pipe using a user-provided IO operation.
818    ///
819    /// If the pipe is ready, the provided closure is called. The closure
820    /// should attempt to perform IO operation from the pipe by manually
821    /// calling the appropriate syscall. If the operation fails because the
822    /// pipe is not actually ready, then the closure should return a
823    /// `WouldBlock` error and the readiness flag is cleared. The return value
824    /// of the closure is then returned by `try_io`.
825    ///
826    /// If the pipe is not ready, then the closure is not called
827    /// and a `WouldBlock` error is returned.
828    ///
829    /// The closure should only return a `WouldBlock` error if it has performed
830    /// an IO operation on the pipe that failed due to the pipe not being
831    /// ready. Returning a `WouldBlock` error in any other situation will
832    /// incorrectly clear the readiness flag, which can cause the pipe to
833    /// behave incorrectly.
834    ///
835    /// The closure should not perform the IO operation using any of the
836    /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
837    /// the readiness flag and can cause the pipe to behave incorrectly.
838    ///
839    /// This method is not intended to be used with combined interests.
840    /// The closure should perform only one type of IO operation, so it should not
841    /// require more than one ready state. This method may panic or sleep forever
842    /// if it is called with a combined interest.
843    ///
844    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
845    ///
846    /// [`readable()`]: NamedPipeServer::readable()
847    /// [`writable()`]: NamedPipeServer::writable()
848    /// [`ready()`]: NamedPipeServer::ready()
849    pub fn try_io<R>(
850        &self,
851        interest: Interest,
852        f: impl FnOnce() -> io::Result<R>,
853    ) -> io::Result<R> {
854        self.io.registration().try_io(interest, f)
855    }
856
857    /// Reads or writes from the pipe using a user-provided IO operation.
858    ///
859    /// The readiness of the pipe is awaited and when the pipe is ready,
860    /// the provided closure is called. The closure should attempt to perform
861    /// IO operation on the pipe by manually calling the appropriate syscall.
862    /// If the operation fails because the pipe is not actually ready,
863    /// then the closure should return a `WouldBlock` error. In such case the
864    /// readiness flag is cleared and the pipe readiness is awaited again.
865    /// This loop is repeated until the closure returns an `Ok` or an error
866    /// other than `WouldBlock`.
867    ///
868    /// The closure should only return a `WouldBlock` error if it has performed
869    /// an IO operation on the pipe that failed due to the pipe not being
870    /// ready. Returning a `WouldBlock` error in any other situation will
871    /// incorrectly clear the readiness flag, which can cause the pipe to
872    /// behave incorrectly.
873    ///
874    /// The closure should not perform the IO operation using any of the methods
875    /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
876    /// readiness flag and can cause the pipe to behave incorrectly.
877    ///
878    /// This method is not intended to be used with combined interests.
879    /// The closure should perform only one type of IO operation, so it should not
880    /// require more than one ready state. This method may panic or sleep forever
881    /// if it is called with a combined interest.
882    pub async fn async_io<R>(
883        &self,
884        interest: Interest,
885        f: impl FnMut() -> io::Result<R>,
886    ) -> io::Result<R> {
887        self.io.registration().async_io(interest, f).await
888    }
889}
890
891impl AsyncRead for NamedPipeServer {
892    fn poll_read(
893        self: Pin<&mut Self>,
894        cx: &mut Context<'_>,
895        buf: &mut ReadBuf<'_>,
896    ) -> Poll<io::Result<()>> {
897        unsafe { self.io.poll_read(cx, buf) }
898    }
899}
900
901impl AsyncWrite for NamedPipeServer {
902    fn poll_write(
903        self: Pin<&mut Self>,
904        cx: &mut Context<'_>,
905        buf: &[u8],
906    ) -> Poll<io::Result<usize>> {
907        self.io.poll_write(cx, buf)
908    }
909
910    fn poll_write_vectored(
911        self: Pin<&mut Self>,
912        cx: &mut Context<'_>,
913        bufs: &[io::IoSlice<'_>],
914    ) -> Poll<io::Result<usize>> {
915        self.io.poll_write_vectored(cx, bufs)
916    }
917
918    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
919        Poll::Ready(Ok(()))
920    }
921
922    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
923        self.poll_flush(cx)
924    }
925}
926
927impl AsRawHandle for NamedPipeServer {
928    fn as_raw_handle(&self) -> RawHandle {
929        self.io.as_raw_handle()
930    }
931}
932
933impl AsHandle for NamedPipeServer {
934    fn as_handle(&self) -> BorrowedHandle<'_> {
935        unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
936    }
937}
938
939/// A [Windows named pipe] client.
940///
941/// Constructed using [`ClientOptions::open`].
942///
943/// Connecting a client correctly involves a few steps. When connecting through
944/// [`ClientOptions::open`], it might error indicating one of two things:
945///
946/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
947/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
948///   for a while and try again.
949///
950/// So a correctly implemented client looks like this:
951///
952/// ```no_run
953/// use std::time::Duration;
954/// use tokio::net::windows::named_pipe::ClientOptions;
955/// use tokio::time;
956/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
957///
958/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
959///
960/// # #[tokio::main] async fn main() -> std::io::Result<()> {
961/// let client = loop {
962///     match ClientOptions::new().open(PIPE_NAME) {
963///         Ok(client) => break client,
964///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
965///         Err(e) => return Err(e),
966///     }
967///
968///     time::sleep(Duration::from_millis(50)).await;
969/// };
970///
971/// /* use the connected client */
972/// # Ok(()) }
973/// ```
974///
975/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
976/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
977#[derive(Debug)]
978pub struct NamedPipeClient {
979    io: PollEvented<mio_windows::NamedPipe>,
980}
981
982impl NamedPipeClient {
983    /// Constructs a new named pipe client from the specified raw handle.
984    ///
985    /// This function will consume ownership of the handle given, passing
986    /// responsibility for closing the handle to the returned object.
987    ///
988    /// This function is also unsafe as the primitives currently returned have
989    /// the contract that they are the sole owner of the file descriptor they
990    /// are wrapping. Usage of this function could accidentally allow violating
991    /// this contract which can cause memory unsafety in code that relies on it
992    /// being true.
993    ///
994    /// # Errors
995    ///
996    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
997    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
998    ///
999    /// [Tokio Runtime]: crate::runtime::Runtime
1000    /// [enabled I/O]: crate::runtime::Builder::enable_io
1001    pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1002        let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1003
1004        Ok(Self {
1005            io: PollEvented::new(named_pipe)?,
1006        })
1007    }
1008
1009    /// Retrieves information about the named pipe the client is associated
1010    /// with.
1011    ///
1012    /// ```no_run
1013    /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1014    ///
1015    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1016    ///
1017    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1018    /// let client = ClientOptions::new()
1019    ///     .open(PIPE_NAME)?;
1020    ///
1021    /// let client_info = client.info()?;
1022    ///
1023    /// assert_eq!(client_info.end, PipeEnd::Client);
1024    /// assert_eq!(client_info.mode, PipeMode::Message);
1025    /// assert_eq!(client_info.max_instances, 5);
1026    /// # Ok(()) }
1027    /// ```
1028    pub fn info(&self) -> io::Result<PipeInfo> {
1029        // Safety: we're ensuring the lifetime of the named pipe.
1030        unsafe { named_pipe_info(self.io.as_raw_handle()) }
1031    }
1032
1033    /// Waits for any of the requested ready states.
1034    ///
1035    /// This function is usually paired with `try_read()` or `try_write()`. It
1036    /// can be used to concurrently read / write to the same pipe on a single
1037    /// task without splitting the pipe.
1038    ///
1039    /// The function may complete without the pipe being ready. This is a
1040    /// false-positive and attempting an operation will return with
1041    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1042    /// [`Ready`] set, so you should always check the returned value and possibly
1043    /// wait again if the requested states are not set.
1044    ///
1045    /// # Examples
1046    ///
1047    /// Concurrently read and write to the pipe on the same task without
1048    /// splitting.
1049    ///
1050    /// ```no_run
1051    /// use tokio::io::Interest;
1052    /// use tokio::net::windows::named_pipe;
1053    /// use std::error::Error;
1054    /// use std::io;
1055    ///
1056    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1057    ///
1058    /// #[tokio::main]
1059    /// async fn main() -> Result<(), Box<dyn Error>> {
1060    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1061    ///
1062    ///     loop {
1063    ///         let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1064    ///
1065    ///         if ready.is_readable() {
1066    ///             let mut data = vec![0; 1024];
1067    ///             // Try to read data, this may still fail with `WouldBlock`
1068    ///             // if the readiness event is a false positive.
1069    ///             match client.try_read(&mut data) {
1070    ///                 Ok(n) => {
1071    ///                     println!("read {} bytes", n);
1072    ///                 }
1073    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1074    ///                     continue;
1075    ///                 }
1076    ///                 Err(e) => {
1077    ///                     return Err(e.into());
1078    ///                 }
1079    ///             }
1080    ///         }
1081    ///
1082    ///         if ready.is_writable() {
1083    ///             // Try to write data, this may still fail with `WouldBlock`
1084    ///             // if the readiness event is a false positive.
1085    ///             match client.try_write(b"hello world") {
1086    ///                 Ok(n) => {
1087    ///                     println!("write {} bytes", n);
1088    ///                 }
1089    ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1090    ///                     continue;
1091    ///                 }
1092    ///                 Err(e) => {
1093    ///                     return Err(e.into());
1094    ///                 }
1095    ///             }
1096    ///         }
1097    ///     }
1098    /// }
1099    /// ```
1100    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1101        let event = self.io.registration().readiness(interest).await?;
1102        Ok(event.ready)
1103    }
1104
1105    /// Waits for the pipe to become readable.
1106    ///
1107    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1108    /// paired with `try_read()`.
1109    ///
1110    /// # Examples
1111    ///
1112    /// ```no_run
1113    /// use tokio::net::windows::named_pipe;
1114    /// use std::error::Error;
1115    /// use std::io;
1116    ///
1117    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1118    ///
1119    /// #[tokio::main]
1120    /// async fn main() -> Result<(), Box<dyn Error>> {
1121    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1122    ///
1123    ///     let mut msg = vec![0; 1024];
1124    ///
1125    ///     loop {
1126    ///         // Wait for the pipe to be readable
1127    ///         client.readable().await?;
1128    ///
1129    ///         // Try to read data, this may still fail with `WouldBlock`
1130    ///         // if the readiness event is a false positive.
1131    ///         match client.try_read(&mut msg) {
1132    ///             Ok(n) => {
1133    ///                 msg.truncate(n);
1134    ///                 break;
1135    ///             }
1136    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1137    ///                 continue;
1138    ///             }
1139    ///             Err(e) => {
1140    ///                 return Err(e.into());
1141    ///             }
1142    ///         }
1143    ///     }
1144    ///
1145    ///     println!("GOT = {:?}", msg);
1146    ///     Ok(())
1147    /// }
1148    /// ```
1149    pub async fn readable(&self) -> io::Result<()> {
1150        self.ready(Interest::READABLE).await?;
1151        Ok(())
1152    }
1153
1154    /// Polls for read readiness.
1155    ///
1156    /// If the pipe is not currently ready for reading, this method will
1157    /// store a clone of the `Waker` from the provided `Context`. When the pipe
1158    /// becomes ready for reading, `Waker::wake` will be called on the waker.
1159    ///
1160    /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1161    /// the `Waker` from the `Context` passed to the most recent call is
1162    /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1163    /// second, independent waker.)
1164    ///
1165    /// This function is intended for cases where creating and pinning a future
1166    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1167    /// preferred, as this supports polling from multiple tasks at once.
1168    ///
1169    /// # Return value
1170    ///
1171    /// The function returns:
1172    ///
1173    /// * `Poll::Pending` if the pipe is not ready for reading.
1174    /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1175    /// * `Poll::Ready(Err(e))` if an error is encountered.
1176    ///
1177    /// # Errors
1178    ///
1179    /// This function may encounter any standard I/O error except `WouldBlock`.
1180    ///
1181    /// [`readable`]: method@Self::readable
1182    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1183        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1184    }
1185
1186    /// Tries to read data from the pipe into the provided buffer, returning how
1187    /// many bytes were read.
1188    ///
1189    /// Receives any pending data from the pipe but does not wait for new data
1190    /// to arrive. On success, returns the number of bytes read. Because
1191    /// `try_read()` is non-blocking, the buffer does not have to be stored by
1192    /// the async task and can exist entirely on the stack.
1193    ///
1194    /// Usually, [`readable()`] or [`ready()`] is used with this function.
1195    ///
1196    /// [`readable()`]: NamedPipeClient::readable()
1197    /// [`ready()`]: NamedPipeClient::ready()
1198    ///
1199    /// # Return
1200    ///
1201    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1202    /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1203    ///
1204    /// 1. The pipe's read half is closed and will no longer yield data.
1205    /// 2. The specified buffer was 0 bytes in length.
1206    ///
1207    /// If the pipe is not ready to read data,
1208    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1209    ///
1210    /// # Examples
1211    ///
1212    /// ```no_run
1213    /// use tokio::net::windows::named_pipe;
1214    /// use std::error::Error;
1215    /// use std::io;
1216    ///
1217    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1218    ///
1219    /// #[tokio::main]
1220    /// async fn main() -> Result<(), Box<dyn Error>> {
1221    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1222    ///
1223    ///     loop {
1224    ///         // Wait for the pipe to be readable
1225    ///         client.readable().await?;
1226    ///
1227    ///         // Creating the buffer **after** the `await` prevents it from
1228    ///         // being stored in the async task.
1229    ///         let mut buf = [0; 4096];
1230    ///
1231    ///         // Try to read data, this may still fail with `WouldBlock`
1232    ///         // if the readiness event is a false positive.
1233    ///         match client.try_read(&mut buf) {
1234    ///             Ok(0) => break,
1235    ///             Ok(n) => {
1236    ///                 println!("read {} bytes", n);
1237    ///             }
1238    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1239    ///                 continue;
1240    ///             }
1241    ///             Err(e) => {
1242    ///                 return Err(e.into());
1243    ///             }
1244    ///         }
1245    ///     }
1246    ///
1247    ///     Ok(())
1248    /// }
1249    /// ```
1250    pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1251        self.io
1252            .registration()
1253            .try_io(Interest::READABLE, || (&*self.io).read(buf))
1254    }
1255
1256    /// Tries to read data from the pipe into the provided buffers, returning
1257    /// how many bytes were read.
1258    ///
1259    /// Data is copied to fill each buffer in order, with the final buffer
1260    /// written to possibly being only partially filled. This method behaves
1261    /// equivalently to a single call to [`try_read()`] with concatenated
1262    /// buffers.
1263    ///
1264    /// Receives any pending data from the pipe but does not wait for new data
1265    /// to arrive. On success, returns the number of bytes read. Because
1266    /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1267    /// stored by the async task and can exist entirely on the stack.
1268    ///
1269    /// Usually, [`readable()`] or [`ready()`] is used with this function.
1270    ///
1271    /// [`try_read()`]: NamedPipeClient::try_read()
1272    /// [`readable()`]: NamedPipeClient::readable()
1273    /// [`ready()`]: NamedPipeClient::ready()
1274    ///
1275    /// # Return
1276    ///
1277    /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1278    /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1279    /// and will no longer yield data. If the pipe is not ready to read data
1280    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1281    ///
1282    /// # Examples
1283    ///
1284    /// ```no_run
1285    /// use tokio::net::windows::named_pipe;
1286    /// use std::error::Error;
1287    /// use std::io::{self, IoSliceMut};
1288    ///
1289    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1290    ///
1291    /// #[tokio::main]
1292    /// async fn main() -> Result<(), Box<dyn Error>> {
1293    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1294    ///
1295    ///     loop {
1296    ///         // Wait for the pipe to be readable
1297    ///         client.readable().await?;
1298    ///
1299    ///         // Creating the buffer **after** the `await` prevents it from
1300    ///         // being stored in the async task.
1301    ///         let mut buf_a = [0; 512];
1302    ///         let mut buf_b = [0; 1024];
1303    ///         let mut bufs = [
1304    ///             IoSliceMut::new(&mut buf_a),
1305    ///             IoSliceMut::new(&mut buf_b),
1306    ///         ];
1307    ///
1308    ///         // Try to read data, this may still fail with `WouldBlock`
1309    ///         // if the readiness event is a false positive.
1310    ///         match client.try_read_vectored(&mut bufs) {
1311    ///             Ok(0) => break,
1312    ///             Ok(n) => {
1313    ///                 println!("read {} bytes", n);
1314    ///             }
1315    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1316    ///                 continue;
1317    ///             }
1318    ///             Err(e) => {
1319    ///                 return Err(e.into());
1320    ///             }
1321    ///         }
1322    ///     }
1323    ///
1324    ///     Ok(())
1325    /// }
1326    /// ```
1327    pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1328        self.io
1329            .registration()
1330            .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1331    }
1332
1333    cfg_io_util! {
1334        /// Tries to read data from the stream into the provided buffer, advancing the
1335        /// buffer's internal cursor, returning how many bytes were read.
1336        ///
1337        /// Receives any pending data from the pipe but does not wait for new data
1338        /// to arrive. On success, returns the number of bytes read. Because
1339        /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1340        /// the async task and can exist entirely on the stack.
1341        ///
1342        /// Usually, [`readable()`] or [`ready()`] is used with this function.
1343        ///
1344        /// [`readable()`]: NamedPipeClient::readable()
1345        /// [`ready()`]: NamedPipeClient::ready()
1346        ///
1347        /// # Return
1348        ///
1349        /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1350        /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1351        /// and will no longer yield data. If the stream is not ready to read data
1352        /// `Err(io::ErrorKind::WouldBlock)` is returned.
1353        ///
1354        /// # Examples
1355        ///
1356        /// ```no_run
1357        /// use tokio::net::windows::named_pipe;
1358        /// use std::error::Error;
1359        /// use std::io;
1360        ///
1361        /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1362        ///
1363        /// #[tokio::main]
1364        /// async fn main() -> Result<(), Box<dyn Error>> {
1365        ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1366        ///
1367        ///     loop {
1368        ///         // Wait for the pipe to be readable
1369        ///         client.readable().await?;
1370        ///
1371        ///         let mut buf = Vec::with_capacity(4096);
1372        ///
1373        ///         // Try to read data, this may still fail with `WouldBlock`
1374        ///         // if the readiness event is a false positive.
1375        ///         match client.try_read_buf(&mut buf) {
1376        ///             Ok(0) => break,
1377        ///             Ok(n) => {
1378        ///                 println!("read {} bytes", n);
1379        ///             }
1380        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1381        ///                 continue;
1382        ///             }
1383        ///             Err(e) => {
1384        ///                 return Err(e.into());
1385        ///             }
1386        ///         }
1387        ///     }
1388        ///
1389        ///     Ok(())
1390        /// }
1391        /// ```
1392        pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1393            self.io.registration().try_io(Interest::READABLE, || {
1394                use std::io::Read;
1395
1396                let dst = buf.chunk_mut();
1397                let dst =
1398                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1399
1400                // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1401                // buffer.
1402                let n = (&*self.io).read(dst)?;
1403
1404                unsafe {
1405                    buf.advance_mut(n);
1406                }
1407
1408                Ok(n)
1409            })
1410        }
1411    }
1412
1413    /// Waits for the pipe to become writable.
1414    ///
1415    /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1416    /// paired with `try_write()`.
1417    ///
1418    /// # Examples
1419    ///
1420    /// ```no_run
1421    /// use tokio::net::windows::named_pipe;
1422    /// use std::error::Error;
1423    /// use std::io;
1424    ///
1425    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1426    ///
1427    /// #[tokio::main]
1428    /// async fn main() -> Result<(), Box<dyn Error>> {
1429    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1430    ///
1431    ///     loop {
1432    ///         // Wait for the pipe to be writable
1433    ///         client.writable().await?;
1434    ///
1435    ///         // Try to write data, this may still fail with `WouldBlock`
1436    ///         // if the readiness event is a false positive.
1437    ///         match client.try_write(b"hello world") {
1438    ///             Ok(n) => {
1439    ///                 break;
1440    ///             }
1441    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1442    ///                 continue;
1443    ///             }
1444    ///             Err(e) => {
1445    ///                 return Err(e.into());
1446    ///             }
1447    ///         }
1448    ///     }
1449    ///
1450    ///     Ok(())
1451    /// }
1452    /// ```
1453    pub async fn writable(&self) -> io::Result<()> {
1454        self.ready(Interest::WRITABLE).await?;
1455        Ok(())
1456    }
1457
1458    /// Polls for write readiness.
1459    ///
1460    /// If the pipe is not currently ready for writing, this method will
1461    /// store a clone of the `Waker` from the provided `Context`. When the pipe
1462    /// becomes ready for writing, `Waker::wake` will be called on the waker.
1463    ///
1464    /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1465    /// the `Waker` from the `Context` passed to the most recent call is
1466    /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1467    /// second, independent waker.)
1468    ///
1469    /// This function is intended for cases where creating and pinning a future
1470    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1471    /// preferred, as this supports polling from multiple tasks at once.
1472    ///
1473    /// # Return value
1474    ///
1475    /// The function returns:
1476    ///
1477    /// * `Poll::Pending` if the pipe is not ready for writing.
1478    /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1479    /// * `Poll::Ready(Err(e))` if an error is encountered.
1480    ///
1481    /// # Errors
1482    ///
1483    /// This function may encounter any standard I/O error except `WouldBlock`.
1484    ///
1485    /// [`writable`]: method@Self::writable
1486    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1487        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1488    }
1489
1490    /// Tries to write a buffer to the pipe, returning how many bytes were
1491    /// written.
1492    ///
1493    /// The function will attempt to write the entire contents of `buf`, but
1494    /// only part of the buffer may be written.
1495    ///
1496    /// This function is usually paired with `writable()`.
1497    ///
1498    /// # Return
1499    ///
1500    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1501    /// number of bytes written. If the pipe is not ready to write data,
1502    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1503    ///
1504    /// # Examples
1505    ///
1506    /// ```no_run
1507    /// use tokio::net::windows::named_pipe;
1508    /// use std::error::Error;
1509    /// use std::io;
1510    ///
1511    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1512    ///
1513    /// #[tokio::main]
1514    /// async fn main() -> Result<(), Box<dyn Error>> {
1515    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1516    ///
1517    ///     loop {
1518    ///         // Wait for the pipe to be writable
1519    ///         client.writable().await?;
1520    ///
1521    ///         // Try to write data, this may still fail with `WouldBlock`
1522    ///         // if the readiness event is a false positive.
1523    ///         match client.try_write(b"hello world") {
1524    ///             Ok(n) => {
1525    ///                 break;
1526    ///             }
1527    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1528    ///                 continue;
1529    ///             }
1530    ///             Err(e) => {
1531    ///                 return Err(e.into());
1532    ///             }
1533    ///         }
1534    ///     }
1535    ///
1536    ///     Ok(())
1537    /// }
1538    /// ```
1539    pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1540        self.io
1541            .registration()
1542            .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1543    }
1544
1545    /// Tries to write several buffers to the pipe, returning how many bytes
1546    /// were written.
1547    ///
1548    /// Data is written from each buffer in order, with the final buffer read
1549    /// from possible being only partially consumed. This method behaves
1550    /// equivalently to a single call to [`try_write()`] with concatenated
1551    /// buffers.
1552    ///
1553    /// This function is usually paired with `writable()`.
1554    ///
1555    /// [`try_write()`]: NamedPipeClient::try_write()
1556    ///
1557    /// # Return
1558    ///
1559    /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1560    /// number of bytes written. If the pipe is not ready to write data,
1561    /// `Err(io::ErrorKind::WouldBlock)` is returned.
1562    ///
1563    /// # Examples
1564    ///
1565    /// ```no_run
1566    /// use tokio::net::windows::named_pipe;
1567    /// use std::error::Error;
1568    /// use std::io;
1569    ///
1570    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1571    ///
1572    /// #[tokio::main]
1573    /// async fn main() -> Result<(), Box<dyn Error>> {
1574    ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1575    ///
1576    ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1577    ///
1578    ///     loop {
1579    ///         // Wait for the pipe to be writable
1580    ///         client.writable().await?;
1581    ///
1582    ///         // Try to write data, this may still fail with `WouldBlock`
1583    ///         // if the readiness event is a false positive.
1584    ///         match client.try_write_vectored(&bufs) {
1585    ///             Ok(n) => {
1586    ///                 break;
1587    ///             }
1588    ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1589    ///                 continue;
1590    ///             }
1591    ///             Err(e) => {
1592    ///                 return Err(e.into());
1593    ///             }
1594    ///         }
1595    ///     }
1596    ///
1597    ///     Ok(())
1598    /// }
1599    /// ```
1600    pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1601        self.io
1602            .registration()
1603            .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1604    }
1605
1606    /// Tries to read or write from the pipe using a user-provided IO operation.
1607    ///
1608    /// If the pipe is ready, the provided closure is called. The closure
1609    /// should attempt to perform IO operation from the pipe by manually
1610    /// calling the appropriate syscall. If the operation fails because the
1611    /// pipe is not actually ready, then the closure should return a
1612    /// `WouldBlock` error and the readiness flag is cleared. The return value
1613    /// of the closure is then returned by `try_io`.
1614    ///
1615    /// If the pipe is not ready, then the closure is not called
1616    /// and a `WouldBlock` error is returned.
1617    ///
1618    /// The closure should only return a `WouldBlock` error if it has performed
1619    /// an IO operation on the pipe that failed due to the pipe not being
1620    /// ready. Returning a `WouldBlock` error in any other situation will
1621    /// incorrectly clear the readiness flag, which can cause the pipe to
1622    /// behave incorrectly.
1623    ///
1624    /// The closure should not perform the IO operation using any of the methods
1625    /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1626    /// readiness flag and can cause the pipe to behave incorrectly.
1627    ///
1628    /// This method is not intended to be used with combined interests.
1629    /// The closure should perform only one type of IO operation, so it should not
1630    /// require more than one ready state. This method may panic or sleep forever
1631    /// if it is called with a combined interest.
1632    ///
1633    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1634    ///
1635    /// [`readable()`]: NamedPipeClient::readable()
1636    /// [`writable()`]: NamedPipeClient::writable()
1637    /// [`ready()`]: NamedPipeClient::ready()
1638    pub fn try_io<R>(
1639        &self,
1640        interest: Interest,
1641        f: impl FnOnce() -> io::Result<R>,
1642    ) -> io::Result<R> {
1643        self.io.registration().try_io(interest, f)
1644    }
1645
1646    /// Reads or writes from the pipe using a user-provided IO operation.
1647    ///
1648    /// The readiness of the pipe is awaited and when the pipe is ready,
1649    /// the provided closure is called. The closure should attempt to perform
1650    /// IO operation on the pipe by manually calling the appropriate syscall.
1651    /// If the operation fails because the pipe is not actually ready,
1652    /// then the closure should return a `WouldBlock` error. In such case the
1653    /// readiness flag is cleared and the pipe readiness is awaited again.
1654    /// This loop is repeated until the closure returns an `Ok` or an error
1655    /// other than `WouldBlock`.
1656    ///
1657    /// The closure should only return a `WouldBlock` error if it has performed
1658    /// an IO operation on the pipe that failed due to the pipe not being
1659    /// ready. Returning a `WouldBlock` error in any other situation will
1660    /// incorrectly clear the readiness flag, which can cause the pipe to
1661    /// behave incorrectly.
1662    ///
1663    /// The closure should not perform the IO operation using any of the methods
1664    /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1665    /// readiness flag and can cause the pipe to behave incorrectly.
1666    ///
1667    /// This method is not intended to be used with combined interests.
1668    /// The closure should perform only one type of IO operation, so it should not
1669    /// require more than one ready state. This method may panic or sleep forever
1670    /// if it is called with a combined interest.
1671    pub async fn async_io<R>(
1672        &self,
1673        interest: Interest,
1674        f: impl FnMut() -> io::Result<R>,
1675    ) -> io::Result<R> {
1676        self.io.registration().async_io(interest, f).await
1677    }
1678}
1679
1680impl AsyncRead for NamedPipeClient {
1681    fn poll_read(
1682        self: Pin<&mut Self>,
1683        cx: &mut Context<'_>,
1684        buf: &mut ReadBuf<'_>,
1685    ) -> Poll<io::Result<()>> {
1686        unsafe { self.io.poll_read(cx, buf) }
1687    }
1688}
1689
1690impl AsyncWrite for NamedPipeClient {
1691    fn poll_write(
1692        self: Pin<&mut Self>,
1693        cx: &mut Context<'_>,
1694        buf: &[u8],
1695    ) -> Poll<io::Result<usize>> {
1696        self.io.poll_write(cx, buf)
1697    }
1698
1699    fn poll_write_vectored(
1700        self: Pin<&mut Self>,
1701        cx: &mut Context<'_>,
1702        bufs: &[io::IoSlice<'_>],
1703    ) -> Poll<io::Result<usize>> {
1704        self.io.poll_write_vectored(cx, bufs)
1705    }
1706
1707    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1708        Poll::Ready(Ok(()))
1709    }
1710
1711    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1712        self.poll_flush(cx)
1713    }
1714}
1715
1716impl AsRawHandle for NamedPipeClient {
1717    fn as_raw_handle(&self) -> RawHandle {
1718        self.io.as_raw_handle()
1719    }
1720}
1721
1722impl AsHandle for NamedPipeClient {
1723    fn as_handle(&self) -> BorrowedHandle<'_> {
1724        unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1725    }
1726}
1727
1728/// A builder structure for construct a named pipe with named pipe-specific
1729/// options. This is required to use for named pipe servers who wants to modify
1730/// pipe-related options.
1731///
1732/// See [`ServerOptions::create`].
1733#[derive(Debug, Clone)]
1734pub struct ServerOptions {
1735    // dwOpenMode
1736    access_inbound: bool,
1737    access_outbound: bool,
1738    first_pipe_instance: bool,
1739    write_dac: bool,
1740    write_owner: bool,
1741    access_system_security: bool,
1742    // dwPipeMode
1743    pipe_mode: PipeMode,
1744    reject_remote_clients: bool,
1745    // other options
1746    max_instances: u32,
1747    out_buffer_size: u32,
1748    in_buffer_size: u32,
1749    default_timeout: u32,
1750}
1751
1752impl ServerOptions {
1753    /// Creates a new named pipe builder with the default settings.
1754    ///
1755    /// ```
1756    /// use tokio::net::windows::named_pipe::ServerOptions;
1757    ///
1758    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1759    ///
1760    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1761    /// let server = ServerOptions::new().create(PIPE_NAME)?;
1762    /// # Ok(()) }
1763    /// ```
1764    pub fn new() -> ServerOptions {
1765        ServerOptions {
1766            access_inbound: true,
1767            access_outbound: true,
1768            first_pipe_instance: false,
1769            write_dac: false,
1770            write_owner: false,
1771            access_system_security: false,
1772            pipe_mode: PipeMode::Byte,
1773            reject_remote_clients: true,
1774            max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1775            out_buffer_size: 65536,
1776            in_buffer_size: 65536,
1777            default_timeout: 0,
1778        }
1779    }
1780
1781    /// The pipe mode.
1782    ///
1783    /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1784    /// documentation of what each mode means.
1785    ///
1786    /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in  [`dwPipeMode`].
1787    ///
1788    /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1789    pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1790        self.pipe_mode = pipe_mode;
1791        self
1792    }
1793
1794    /// The flow of data in the pipe goes from client to server only.
1795    ///
1796    /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1797    ///
1798    /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1799    ///
1800    /// # Errors
1801    ///
1802    /// Server side prevents connecting by denying inbound access, client errors
1803    /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1804    /// the connection.
1805    ///
1806    /// ```
1807    /// use std::io;
1808    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1809    ///
1810    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1811    ///
1812    /// # #[tokio::main] async fn main() -> io::Result<()> {
1813    /// let _server = ServerOptions::new()
1814    ///     .access_inbound(false)
1815    ///     .create(PIPE_NAME)?;
1816    ///
1817    /// let e = ClientOptions::new()
1818    ///     .open(PIPE_NAME)
1819    ///     .unwrap_err();
1820    ///
1821    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1822    /// # Ok(()) }
1823    /// ```
1824    ///
1825    /// Disabling writing allows a client to connect, but errors with
1826    /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1827    ///
1828    /// ```
1829    /// use std::io;
1830    /// use tokio::io::AsyncWriteExt;
1831    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1832    ///
1833    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1834    ///
1835    /// # #[tokio::main] async fn main() -> io::Result<()> {
1836    /// let server = ServerOptions::new()
1837    ///     .access_inbound(false)
1838    ///     .create(PIPE_NAME)?;
1839    ///
1840    /// let mut client = ClientOptions::new()
1841    ///     .write(false)
1842    ///     .open(PIPE_NAME)?;
1843    ///
1844    /// server.connect().await?;
1845    ///
1846    /// let e = client.write(b"ping").await.unwrap_err();
1847    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1848    /// # Ok(()) }
1849    /// ```
1850    ///
1851    /// # Examples
1852    ///
1853    /// A unidirectional named pipe that only supports server-to-client
1854    /// communication.
1855    ///
1856    /// ```
1857    /// use std::io;
1858    /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1859    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1860    ///
1861    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1862    ///
1863    /// # #[tokio::main] async fn main() -> io::Result<()> {
1864    /// let mut server = ServerOptions::new()
1865    ///     .access_inbound(false)
1866    ///     .create(PIPE_NAME)?;
1867    ///
1868    /// let mut client = ClientOptions::new()
1869    ///     .write(false)
1870    ///     .open(PIPE_NAME)?;
1871    ///
1872    /// server.connect().await?;
1873    ///
1874    /// let write = server.write_all(b"ping");
1875    ///
1876    /// let mut buf = [0u8; 4];
1877    /// let read = client.read_exact(&mut buf);
1878    ///
1879    /// let ((), read) = tokio::try_join!(write, read)?;
1880    ///
1881    /// assert_eq!(read, 4);
1882    /// assert_eq!(&buf[..], b"ping");
1883    /// # Ok(()) }
1884    /// ```
1885    pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1886        self.access_inbound = allowed;
1887        self
1888    }
1889
1890    /// The flow of data in the pipe goes from server to client only.
1891    ///
1892    /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1893    ///
1894    /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1895    ///
1896    /// # Errors
1897    ///
1898    /// Server side prevents connecting by denying outbound access, client
1899    /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1900    /// create the connection.
1901    ///
1902    /// ```
1903    /// use std::io;
1904    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1905    ///
1906    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1907    ///
1908    /// # #[tokio::main] async fn main() -> io::Result<()> {
1909    /// let server = ServerOptions::new()
1910    ///     .access_outbound(false)
1911    ///     .create(PIPE_NAME)?;
1912    ///
1913    /// let e = ClientOptions::new()
1914    ///     .open(PIPE_NAME)
1915    ///     .unwrap_err();
1916    ///
1917    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1918    /// # Ok(()) }
1919    /// ```
1920    ///
1921    /// Disabling reading allows a client to connect, but attempting to read
1922    /// will error with [`std::io::ErrorKind::PermissionDenied`].
1923    ///
1924    /// ```
1925    /// use std::io;
1926    /// use tokio::io::AsyncReadExt;
1927    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1928    ///
1929    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1930    ///
1931    /// # #[tokio::main] async fn main() -> io::Result<()> {
1932    /// let server = ServerOptions::new()
1933    ///     .access_outbound(false)
1934    ///     .create(PIPE_NAME)?;
1935    ///
1936    /// let mut client = ClientOptions::new()
1937    ///     .read(false)
1938    ///     .open(PIPE_NAME)?;
1939    ///
1940    /// server.connect().await?;
1941    ///
1942    /// let mut buf = [0u8; 4];
1943    /// let e = client.read(&mut buf).await.unwrap_err();
1944    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1945    /// # Ok(()) }
1946    /// ```
1947    ///
1948    /// # Examples
1949    ///
1950    /// A unidirectional named pipe that only supports client-to-server
1951    /// communication.
1952    ///
1953    /// ```
1954    /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1955    /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1956    ///
1957    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1958    ///
1959    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1960    /// let mut server = ServerOptions::new()
1961    ///     .access_outbound(false)
1962    ///     .create(PIPE_NAME)?;
1963    ///
1964    /// let mut client = ClientOptions::new()
1965    ///     .read(false)
1966    ///     .open(PIPE_NAME)?;
1967    ///
1968    /// server.connect().await?;
1969    ///
1970    /// let write = client.write_all(b"ping");
1971    ///
1972    /// let mut buf = [0u8; 4];
1973    /// let read = server.read_exact(&mut buf);
1974    ///
1975    /// let ((), read) = tokio::try_join!(write, read)?;
1976    ///
1977    /// println!("done reading and writing");
1978    ///
1979    /// assert_eq!(read, 4);
1980    /// assert_eq!(&buf[..], b"ping");
1981    /// # Ok(()) }
1982    /// ```
1983    pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1984        self.access_outbound = allowed;
1985        self
1986    }
1987
1988    /// If you attempt to create multiple instances of a pipe with this flag
1989    /// set, creation of the first server instance succeeds, but creation of any
1990    /// subsequent instances will fail with
1991    /// [`std::io::ErrorKind::PermissionDenied`].
1992    ///
1993    /// This option is intended to be used with servers that want to ensure that
1994    /// they are the only process listening for clients on a given named pipe.
1995    /// This is accomplished by enabling it for the first server instance
1996    /// created in a process.
1997    ///
1998    /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1999    ///
2000    /// # Errors
2001    ///
2002    /// If this option is set and more than one instance of the server for a
2003    /// given named pipe exists, calling [`create`] will fail with
2004    /// [`std::io::ErrorKind::PermissionDenied`].
2005    ///
2006    /// ```
2007    /// use std::io;
2008    /// use tokio::net::windows::named_pipe::ServerOptions;
2009    ///
2010    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2011    ///
2012    /// # #[tokio::main] async fn main() -> io::Result<()> {
2013    /// let server1 = ServerOptions::new()
2014    ///     .first_pipe_instance(true)
2015    ///     .create(PIPE_NAME)?;
2016    ///
2017    /// // Second server errs, since it's not the first instance.
2018    /// let e = ServerOptions::new()
2019    ///     .first_pipe_instance(true)
2020    ///     .create(PIPE_NAME)
2021    ///     .unwrap_err();
2022    ///
2023    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2024    /// # Ok(()) }
2025    /// ```
2026    ///
2027    /// # Examples
2028    ///
2029    /// ```
2030    /// use std::io;
2031    /// use tokio::net::windows::named_pipe::ServerOptions;
2032    ///
2033    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2034    ///
2035    /// # #[tokio::main] async fn main() -> io::Result<()> {
2036    /// let mut builder = ServerOptions::new();
2037    /// builder.first_pipe_instance(true);
2038    ///
2039    /// let server = builder.create(PIPE_NAME)?;
2040    /// let e = builder.create(PIPE_NAME).unwrap_err();
2041    /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2042    /// drop(server);
2043    ///
2044    /// // OK: since, we've closed the other instance.
2045    /// let _server2 = builder.create(PIPE_NAME)?;
2046    /// # Ok(()) }
2047    /// ```
2048    ///
2049    /// [`create`]: ServerOptions::create
2050    /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
2051    pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2052        self.first_pipe_instance = first;
2053        self
2054    }
2055
2056    /// Requests permission to modify the pipe's discretionary access control list.
2057    ///
2058    /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2059    ///
2060    /// # Examples
2061    ///
2062    /// ```
2063    /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2064    ///
2065    /// use tokio::net::windows::named_pipe::ServerOptions;
2066    /// use windows_sys::{
2067    ///     Win32::Foundation::ERROR_SUCCESS,
2068    ///     Win32::Security::DACL_SECURITY_INFORMATION,
2069    ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2070    /// };
2071    ///
2072    /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2073    ///
2074    /// # #[tokio::main] async fn main() -> io::Result<()> {
2075    /// let mut pipe_template = ServerOptions::new();
2076    /// pipe_template.write_dac(true);
2077    /// let pipe = pipe_template.create(PIPE_NAME)?;
2078    ///
2079    /// unsafe {
2080    ///     assert_eq!(
2081    ///         ERROR_SUCCESS,
2082    ///         SetSecurityInfo(
2083    ///             pipe.as_raw_handle() as _,
2084    ///             SE_KERNEL_OBJECT,
2085    ///             DACL_SECURITY_INFORMATION,
2086    ///             ptr::null_mut(),
2087    ///             ptr::null_mut(),
2088    ///             ptr::null_mut(),
2089    ///             ptr::null_mut(),
2090    ///         )
2091    ///     );
2092    /// }
2093    ///
2094    /// # Ok(()) }
2095    /// ```
2096    ///
2097    /// ```
2098    /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2099    ///
2100    /// use tokio::net::windows::named_pipe::ServerOptions;
2101    /// use windows_sys::{
2102    ///     Win32::Foundation::ERROR_ACCESS_DENIED,
2103    ///     Win32::Security::DACL_SECURITY_INFORMATION,
2104    ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2105    /// };
2106    ///
2107    /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2108    ///
2109    /// # #[tokio::main] async fn main() -> io::Result<()> {
2110    /// let mut pipe_template = ServerOptions::new();
2111    /// pipe_template.write_dac(false);
2112    /// let pipe = pipe_template.create(PIPE_NAME)?;
2113    ///
2114    /// unsafe {
2115    ///     assert_eq!(
2116    ///         ERROR_ACCESS_DENIED,
2117    ///         SetSecurityInfo(
2118    ///             pipe.as_raw_handle() as _,
2119    ///             SE_KERNEL_OBJECT,
2120    ///             DACL_SECURITY_INFORMATION,
2121    ///             ptr::null_mut(),
2122    ///             ptr::null_mut(),
2123    ///             ptr::null_mut(),
2124    ///             ptr::null_mut(),
2125    ///         )
2126    ///     );
2127    /// }
2128    ///
2129    /// # Ok(()) }
2130    /// ```
2131    ///
2132    /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2133    pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2134        self.write_dac = requested;
2135        self
2136    }
2137
2138    /// Requests permission to modify the pipe's owner.
2139    ///
2140    /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2141    ///
2142    /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2143    pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2144        self.write_owner = requested;
2145        self
2146    }
2147
2148    /// Requests permission to modify the pipe's system access control list.
2149    ///
2150    /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2151    ///
2152    /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2153    pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2154        self.access_system_security = requested;
2155        self
2156    }
2157
2158    /// Indicates whether this server can accept remote clients or not. Remote
2159    /// clients are disabled by default.
2160    ///
2161    /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2162    ///
2163    /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
2164    pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2165        self.reject_remote_clients = reject;
2166        self
2167    }
2168
2169    /// The maximum number of instances that can be created for this pipe. The
2170    /// first instance of the pipe can specify this value; the same number must
2171    /// be specified for other instances of the pipe. Acceptable values are in
2172    /// the range 1 through 254. The default value is unlimited.
2173    ///
2174    /// This corresponds to specifying [`nMaxInstances`].
2175    ///
2176    /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2177    ///
2178    /// # Errors
2179    ///
2180    /// The same numbers of `max_instances` have to be used by all servers. Any
2181    /// additional servers trying to be built which uses a mismatching value
2182    /// might error.
2183    ///
2184    /// ```
2185    /// use std::io;
2186    /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2187    /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2188    ///
2189    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2190    ///
2191    /// # #[tokio::main] async fn main() -> io::Result<()> {
2192    /// let mut server = ServerOptions::new();
2193    /// server.max_instances(2);
2194    ///
2195    /// let s1 = server.create(PIPE_NAME)?;
2196    /// let c1 = ClientOptions::new().open(PIPE_NAME);
2197    ///
2198    /// let s2 = server.create(PIPE_NAME)?;
2199    /// let c2 = ClientOptions::new().open(PIPE_NAME);
2200    ///
2201    /// // Too many servers!
2202    /// let e = server.create(PIPE_NAME).unwrap_err();
2203    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2204    ///
2205    /// // Still too many servers even if we specify a higher value!
2206    /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2207    /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2208    /// # Ok(()) }
2209    /// ```
2210    ///
2211    /// # Panics
2212    ///
2213    /// This function will panic if more than 254 instances are specified. If
2214    /// you do not wish to set an instance limit, leave it unspecified.
2215    ///
2216    /// ```should_panic
2217    /// use tokio::net::windows::named_pipe::ServerOptions;
2218    ///
2219    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2220    /// let builder = ServerOptions::new().max_instances(255);
2221    /// # Ok(()) }
2222    /// ```
2223    #[track_caller]
2224    pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2225        assert!(instances < 255, "cannot specify more than 254 instances");
2226        self.max_instances = instances as u32;
2227        self
2228    }
2229
2230    /// The number of bytes to reserve for the output buffer.
2231    ///
2232    /// This corresponds to specifying [`nOutBufferSize`].
2233    ///
2234    /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2235    pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2236        self.out_buffer_size = buffer;
2237        self
2238    }
2239
2240    /// The number of bytes to reserve for the input buffer.
2241    ///
2242    /// This corresponds to specifying [`nInBufferSize`].
2243    ///
2244    /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2245    pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2246        self.in_buffer_size = buffer;
2247        self
2248    }
2249
2250    /// Creates the named pipe identified by `addr` for use as a server.
2251    ///
2252    /// This uses the [`CreateNamedPipe`] function.
2253    ///
2254    /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2255    ///
2256    /// # Errors
2257    ///
2258    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2259    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2260    ///
2261    /// [Tokio Runtime]: crate::runtime::Runtime
2262    /// [enabled I/O]: crate::runtime::Builder::enable_io
2263    ///
2264    /// # Examples
2265    ///
2266    /// ```
2267    /// use tokio::net::windows::named_pipe::ServerOptions;
2268    ///
2269    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2270    ///
2271    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2272    /// let server = ServerOptions::new().create(PIPE_NAME)?;
2273    /// # Ok(()) }
2274    /// ```
2275    pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2276        // Safety: We're calling create_with_security_attributes_raw w/ a null
2277        // pointer which disables it.
2278        unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2279    }
2280
2281    /// Creates the named pipe identified by `addr` for use as a server.
2282    ///
2283    /// This is the same as [`create`] except that it supports providing the raw
2284    /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2285    /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2286    ///
2287    /// # Errors
2288    ///
2289    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2290    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2291    ///
2292    /// [Tokio Runtime]: crate::runtime::Runtime
2293    /// [enabled I/O]: crate::runtime::Builder::enable_io
2294    ///
2295    /// # Safety
2296    ///
2297    /// The `attrs` argument must either be null or point at a valid instance of
2298    /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2299    /// behavior is identical to calling the [`create`] method.
2300    ///
2301    /// [`create`]: ServerOptions::create
2302    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2303    /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2304    pub unsafe fn create_with_security_attributes_raw(
2305        &self,
2306        addr: impl AsRef<OsStr>,
2307        attrs: *mut c_void,
2308    ) -> io::Result<NamedPipeServer> {
2309        let addr = encode_addr(addr);
2310
2311        let pipe_mode = {
2312            let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2313                windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2314            } else {
2315                windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2316            };
2317            if self.reject_remote_clients {
2318                mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2319            } else {
2320                mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2321            }
2322            mode
2323        };
2324        let open_mode = {
2325            let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2326            if self.access_inbound {
2327                mode |= windows_sys::PIPE_ACCESS_INBOUND;
2328            }
2329            if self.access_outbound {
2330                mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2331            }
2332            if self.first_pipe_instance {
2333                mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2334            }
2335            if self.write_dac {
2336                mode |= windows_sys::WRITE_DAC;
2337            }
2338            if self.write_owner {
2339                mode |= windows_sys::WRITE_OWNER;
2340            }
2341            if self.access_system_security {
2342                mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2343            }
2344            mode
2345        };
2346
2347        let h = windows_sys::CreateNamedPipeW(
2348            addr.as_ptr(),
2349            open_mode,
2350            pipe_mode,
2351            self.max_instances,
2352            self.out_buffer_size,
2353            self.in_buffer_size,
2354            self.default_timeout,
2355            attrs as *mut _,
2356        );
2357
2358        if h == windows_sys::INVALID_HANDLE_VALUE {
2359            return Err(io::Error::last_os_error());
2360        }
2361
2362        NamedPipeServer::from_raw_handle(h as _)
2363    }
2364}
2365
2366/// A builder suitable for building and interacting with named pipes from the
2367/// client side.
2368///
2369/// See [`ClientOptions::open`].
2370#[derive(Debug, Clone)]
2371pub struct ClientOptions {
2372    generic_read: bool,
2373    generic_write: bool,
2374    security_qos_flags: u32,
2375    pipe_mode: PipeMode,
2376}
2377
2378impl ClientOptions {
2379    /// Creates a new named pipe builder with the default settings.
2380    ///
2381    /// ```
2382    /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2383    ///
2384    /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2385    ///
2386    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2387    /// // Server must be created in order for the client creation to succeed.
2388    /// let server = ServerOptions::new().create(PIPE_NAME)?;
2389    /// let client = ClientOptions::new().open(PIPE_NAME)?;
2390    /// # Ok(()) }
2391    /// ```
2392    pub fn new() -> Self {
2393        Self {
2394            generic_read: true,
2395            generic_write: true,
2396            security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2397                | windows_sys::SECURITY_SQOS_PRESENT,
2398            pipe_mode: PipeMode::Byte,
2399        }
2400    }
2401
2402    /// If the client supports reading data. This is enabled by default.
2403    ///
2404    /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2405    ///
2406    /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2407    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2408    pub fn read(&mut self, allowed: bool) -> &mut Self {
2409        self.generic_read = allowed;
2410        self
2411    }
2412
2413    /// If the created pipe supports writing data. This is enabled by default.
2414    ///
2415    /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2416    ///
2417    /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2418    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2419    pub fn write(&mut self, allowed: bool) -> &mut Self {
2420        self.generic_write = allowed;
2421        self
2422    }
2423
2424    /// Sets qos flags which are combined with other flags and attributes in the
2425    /// call to [`CreateFile`].
2426    ///
2427    /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2428    /// calling this function would override that value completely with the
2429    /// argument specified.
2430    ///
2431    /// When `security_qos_flags` is not set, a malicious program can gain the
2432    /// elevated privileges of a privileged Rust process when it allows opening
2433    /// user-specified paths, by tricking it into opening a named pipe. So
2434    /// arguably `security_qos_flags` should also be set when opening arbitrary
2435    /// paths. However the bits can then conflict with other flags, specifically
2436    /// `FILE_FLAG_OPEN_NO_RECALL`.
2437    ///
2438    /// For information about possible values, see [Impersonation Levels] on the
2439    /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2440    /// automatically when using this method.
2441    ///
2442    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2443    /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2444    /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
2445    pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2446        // See: https://github.com/rust-lang/rust/pull/58216
2447        self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2448        self
2449    }
2450
2451    /// The pipe mode.
2452    ///
2453    /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2454    /// documentation of what each mode means.
2455    pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2456        self.pipe_mode = pipe_mode;
2457        self
2458    }
2459
2460    /// Opens the named pipe identified by `addr`.
2461    ///
2462    /// This opens the client using [`CreateFile`] with the
2463    /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2464    ///
2465    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2466    ///
2467    /// # Errors
2468    ///
2469    /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2470    /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2471    ///
2472    /// There are a few errors you need to take into account when creating a
2473    /// named pipe on the client side:
2474    ///
2475    /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2476    ///   does not exist. Presumably the server is not up.
2477    /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2478    ///   but the server is not currently waiting for a connection. Please see the
2479    ///   examples for how to check for this error.
2480    ///
2481    /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2482    /// [enabled I/O]: crate::runtime::Builder::enable_io
2483    /// [Tokio Runtime]: crate::runtime::Runtime
2484    ///
2485    /// A connect loop that waits until a pipe becomes available looks like
2486    /// this:
2487    ///
2488    /// ```no_run
2489    /// use std::time::Duration;
2490    /// use tokio::net::windows::named_pipe::ClientOptions;
2491    /// use tokio::time;
2492    /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2493    ///
2494    /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2495    ///
2496    /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2497    /// let client = loop {
2498    ///     match ClientOptions::new().open(PIPE_NAME) {
2499    ///         Ok(client) => break client,
2500    ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2501    ///         Err(e) => return Err(e),
2502    ///     }
2503    ///
2504    ///     time::sleep(Duration::from_millis(50)).await;
2505    /// };
2506    ///
2507    /// // use the connected client.
2508    /// # Ok(()) }
2509    /// ```
2510    pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2511        // Safety: We're calling open_with_security_attributes_raw w/ a null
2512        // pointer which disables it.
2513        unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2514    }
2515
2516    /// Opens the named pipe identified by `addr`.
2517    ///
2518    /// This is the same as [`open`] except that it supports providing the raw
2519    /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2520    /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2521    ///
2522    /// # Safety
2523    ///
2524    /// The `attrs` argument must either be null or point at a valid instance of
2525    /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2526    /// behavior is identical to calling the [`open`] method.
2527    ///
2528    /// [`open`]: ClientOptions::open
2529    /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2530    /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2531    pub unsafe fn open_with_security_attributes_raw(
2532        &self,
2533        addr: impl AsRef<OsStr>,
2534        attrs: *mut c_void,
2535    ) -> io::Result<NamedPipeClient> {
2536        let addr = encode_addr(addr);
2537
2538        let desired_access = {
2539            let mut access = 0;
2540            if self.generic_read {
2541                access |= windows_sys::GENERIC_READ;
2542            }
2543            if self.generic_write {
2544                access |= windows_sys::GENERIC_WRITE;
2545            }
2546            access
2547        };
2548
2549        // NB: We could use a platform specialized `OpenOptions` here, but since
2550        // we have access to windows_sys it ultimately doesn't hurt to use
2551        // `CreateFile` explicitly since it allows the use of our already
2552        // well-structured wide `addr` to pass into CreateFileW.
2553        let h = windows_sys::CreateFileW(
2554            addr.as_ptr(),
2555            desired_access,
2556            0,
2557            attrs as *mut _,
2558            windows_sys::OPEN_EXISTING,
2559            self.get_flags(),
2560            null_mut(),
2561        );
2562
2563        if h == windows_sys::INVALID_HANDLE_VALUE {
2564            return Err(io::Error::last_os_error());
2565        }
2566
2567        if matches!(self.pipe_mode, PipeMode::Message) {
2568            let mode = windows_sys::PIPE_READMODE_MESSAGE;
2569            let result =
2570                windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2571
2572            if result == 0 {
2573                return Err(io::Error::last_os_error());
2574            }
2575        }
2576
2577        NamedPipeClient::from_raw_handle(h as _)
2578    }
2579
2580    fn get_flags(&self) -> u32 {
2581        self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2582    }
2583}
2584
2585/// The pipe mode of a named pipe.
2586///
2587/// Set through [`ServerOptions::pipe_mode`].
2588#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2589#[non_exhaustive]
2590pub enum PipeMode {
2591    /// Data is written to the pipe as a stream of bytes. The pipe does not
2592    /// distinguish bytes written during different write operations.
2593    ///
2594    /// Corresponds to [`PIPE_TYPE_BYTE`].
2595    ///
2596    /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2597    Byte,
2598    /// Data is written to the pipe as a stream of messages. The pipe treats the
2599    /// bytes written during each write operation as a message unit. Any reading
2600    /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2601    /// completely.
2602    ///
2603    /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2604    ///
2605    /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2606    /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2607    Message,
2608}
2609
2610/// Indicates the end of a named pipe.
2611#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2612#[non_exhaustive]
2613pub enum PipeEnd {
2614    /// The named pipe refers to the client end of a named pipe instance.
2615    ///
2616    /// Corresponds to [`PIPE_CLIENT_END`].
2617    ///
2618    /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2619    Client,
2620    /// The named pipe refers to the server end of a named pipe instance.
2621    ///
2622    /// Corresponds to [`PIPE_SERVER_END`].
2623    ///
2624    /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2625    Server,
2626}
2627
2628/// Information about a named pipe.
2629///
2630/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2631#[derive(Debug, Clone)]
2632#[non_exhaustive]
2633pub struct PipeInfo {
2634    /// Indicates the mode of a named pipe.
2635    pub mode: PipeMode,
2636    /// Indicates the end of a named pipe.
2637    pub end: PipeEnd,
2638    /// The maximum number of instances that can be created for this pipe.
2639    pub max_instances: u32,
2640    /// The number of bytes to reserve for the output buffer.
2641    pub out_buffer_size: u32,
2642    /// The number of bytes to reserve for the input buffer.
2643    pub in_buffer_size: u32,
2644}
2645
2646/// Encodes an address so that it is a null-terminated wide string.
2647fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2648    let len = addr.as_ref().encode_wide().count();
2649    let mut vec = Vec::with_capacity(len + 1);
2650    vec.extend(addr.as_ref().encode_wide());
2651    vec.push(0);
2652    vec.into_boxed_slice()
2653}
2654
2655/// Internal function to get the info out of a raw named pipe.
2656unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2657    let mut flags = 0;
2658    let mut out_buffer_size = 0;
2659    let mut in_buffer_size = 0;
2660    let mut max_instances = 0;
2661
2662    let result = windows_sys::GetNamedPipeInfo(
2663        handle as _,
2664        &mut flags,
2665        &mut out_buffer_size,
2666        &mut in_buffer_size,
2667        &mut max_instances,
2668    );
2669
2670    if result == 0 {
2671        return Err(io::Error::last_os_error());
2672    }
2673
2674    let mut end = PipeEnd::Client;
2675    let mut mode = PipeMode::Byte;
2676
2677    if flags & windows_sys::PIPE_SERVER_END != 0 {
2678        end = PipeEnd::Server;
2679    }
2680
2681    if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2682        mode = PipeMode::Message;
2683    }
2684
2685    Ok(PipeInfo {
2686        end,
2687        mode,
2688        out_buffer_size,
2689        in_buffer_size,
2690        max_instances,
2691    })
2692}